package org.codehaus.activemq.store.jdbc.adapter;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Transaction;
import org.codehaus.activemq.service.TransactionManager;
import org.codehaus.activemq.service.impl.XATransactionCommand;
import org.codehaus.activemq.store.jdbc.JDBCAdapter;
import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
import org.codehaus.activemq.store.jdbc.SequenceGenerator;
import org.codehaus.activemq.store.jdbc.StatementProvider;

public class DefaultJDBCAdapter
  implements JDBCAdapter
{
  private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
  protected final CachingStatementProvider statementProvider;
  protected SequenceGenerator sequenceGenerator = new SequenceGenerator();

  protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
    s.setBytes(index, data);
  }

  protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
    return rs.getBytes(index);
  }

  public DefaultJDBCAdapter(StatementProvider provider)
  {
    this.statementProvider = new CachingStatementProvider(provider);
  }

  public DefaultJDBCAdapter() {
    this(new DefaultStatementProvider());
  }

  public SequenceGenerator getSequenceGenerator() {
    return this.sequenceGenerator;
  }

  public void doCreateTables(Connection c) throws SQLException {
    Statement s = null;
    try {
      s = c.createStatement();
      String[] createStatments = this.statementProvider.getCreateSchemaStatments();
      for (int i = 0; i < createStatments.length; i++)
      {
        try
        {
          boolean rc = s.execute(createStatments[i]);
        }
        catch (SQLException e)
        {
          boolean rc;
          log.debug("Statment failed: " + createStatments[i], e);
        }
      }
    }
    finally {
      try {
        s.close();
      }
      catch (Throwable e) {
      }
    }
  }

  public void initSequenceGenerator(Connection c) {
    PreparedStatement s = null;
    ResultSet rs = null;
    try {
      s = c.prepareStatement(this.statementProvider.getFindLastSequenceId());
      rs = s.executeQuery();
      if (rs.next())
        this.sequenceGenerator.setLastSequenceId(rs.getLong(1));
    }
    catch (SQLException e)
    {
      log.warn("Failed to find last sequence number: " + e, e);
    }
    finally {
      try {
        rs.close();
      }
      catch (Throwable e) {
      }
      try {
        s.close();
      }
      catch (Throwable e) {
      }
    }
  }

  public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException, JMSException {
    PreparedStatement s = null;
    try {
      s = c.prepareStatement(this.statementProvider.getAddMessageStatment());
      s.setLong(1, seq);
      s.setString(2, destinationName);
      s.setString(3, messageID);
      setBinaryData(s, 4, data);
      if (s.executeUpdate() != 1)
        throw new JMSException("Failed to broker message: " + messageID + " in container.  ");
    }
    finally
    {
      try {
        s.close();
      }
      catch (Throwable e) {
      }
    }
  }

  public byte[] doGetMessage(Connection c, long seq) throws SQLException {
    PreparedStatement s = null;
    ResultSet rs = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getFindMessageStatment());
      s.setLong(1, seq);
      rs = s.executeQuery();

      if (!rs.next()) {
        //arrayOfByte = null;
        return null;
      }
      byte[] arrayOfByte = getBinaryData(rs, 1);
      return arrayOfByte;
    }
    finally
    {
      try
      {
        rs.close();
      }
      catch (Throwable e) {
      }
      try {
        s.close(); } catch (Throwable e) {
      }
    }
    //throw localObject;
  }

  public void doRemoveMessage(Connection c, long seq) throws SQLException
  {
    PreparedStatement s = null;
    try {
      s = c.prepareStatement(this.statementProvider.getRemoveMessageStatment());
      s.setLong(1, seq);
      if (s.executeUpdate() != 1)
        log.error("Could not delete sequenece number for: " + seq);
    }
    finally
    {
      try {
        s.close();
      }
      catch (Throwable e) {
      }
    }
  }

  public void doRecover(Connection c, String destinationName, JDBCAdapter.MessageListResultHandler listener) throws SQLException, JMSException {
    PreparedStatement s = null;
    ResultSet rs = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getFindAllMessagesStatment());
      s.setString(1, destinationName);
      rs = s.executeQuery();

      while (rs.next()) {
        long seq = rs.getLong(1);
        String msgid = rs.getString(2);
        listener.onMessage(seq, msgid);
      }
    }
    finally
    {
      try {
        rs.close();
      }
      catch (Throwable e) {
      }
      try {
        s.close();
      }
      catch (Throwable e) {
      }
    }
  }

  public void doGetXids(Connection c, List list) throws SQLException {
    PreparedStatement s = null;
    ResultSet rs = null;
    try {
      s = c.prepareStatement(this.statementProvider.getFindAllXidStatment());
      rs = s.executeQuery();

      while (rs.next()) {
        String xid = rs.getString(1);
        try {
          list.add(new ActiveMQXid(xid));
        }
        catch (JMSException e) {
          log.error("Failed to recover prepared transaction due to invalid xid: " + xid, e);
        }
      }
    }
    finally
    {
      try {
        rs.close();
      }
      catch (Throwable e) {
      }
      try {
        s.close();
      }
      catch (Throwable e) {
      }
    }
  }

  public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
    PreparedStatement s = null;
    try {
      s = c.prepareStatement(this.statementProvider.getRemoveMessageStatment());
      s.setString(1, xid.toLocalTransactionId());
      if (s.executeUpdate() != 1)
        throw new XAException("Failed to remove prepared transaction: " + xid + ".");
    }
    finally
    {
      try {
        s.close();
      }
      catch (Throwable e)
      {
      }
    }
  }

  public void doAddXid(Connection c, ActiveMQXid xid, byte[] data) throws SQLException, XAException {
    PreparedStatement s = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getAddMessageStatment());
      s.setString(1, xid.toLocalTransactionId());
      setBinaryData(s, 2, data);
      if (s.executeUpdate() != 1)
        throw new XAException("Failed to store prepared transaction: " + xid);
    }
    finally
    {
      try
      {
        s.close();
      }
      catch (Throwable e) {
      }
    }
  }

  public void doLoadPreparedTransactions(Connection c, TransactionManager transactionManager) throws SQLException {
    PreparedStatement s = null;
    ResultSet rs = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getFindAllTxStatment());
      rs = s.executeQuery();

      while (rs.next()) {
        String id = rs.getString(1);
        byte[] data = getBinaryData(rs, 2);
        try {
          ActiveMQXid xid = new ActiveMQXid(id);
          Transaction transaction = XATransactionCommand.fromBytes(data);
          transactionManager.loadTransaction(xid, transaction);
        }
        catch (Exception e) {
          log.error("Failed to recover prepared transaction due to invalid xid: " + id, e);
        }
      }
    }
    finally {
      try {
        rs.close();
      }
      catch (Throwable e) {
      }
      try {
        s.close();
      }
      catch (Throwable e)
      {
      }
    }
  }

  public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq)
    throws SQLException, JMSException
  {
    PreparedStatement s = null;
    try {
      s = c.prepareStatement(this.statementProvider.getUpdateLastAckOfDurableSub());
      s.setLong(1, seq);
      s.setString(2, subscriptionID);
      s.setString(3, destinationName);

      if (s.executeUpdate() != 1)
        throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID);
    }
    finally
    {
      try {
        s.close();
      }
      catch (Throwable e)
      {
      }
    }
  }

  public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, JDBCAdapter.MessageListResultHandler listener)
    throws SQLException, JMSException
  {
    PreparedStatement s = null;
    ResultSet rs = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getFindAllDurableSubMessagesStatment());
      s.setString(1, destinationName);
      s.setString(2, subscriptionID);
      rs = s.executeQuery();

      while (rs.next()) {
        long seq = rs.getLong(1);
        String msgid = rs.getString(2);
        listener.onMessage(seq, msgid);
      }
    }
    finally
    {
      try {
        rs.close();
      }
      catch (Throwable e) {
      }
      try {
        s.close();
      }
      catch (Throwable e)
      {
      }
    }
  }

  public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry)
    throws SQLException
  {
    PreparedStatement s = null;
    try {
      s = c.prepareStatement(this.statementProvider.getUpdateDurableSubStatment());
      s.setInt(1, subscriberEntry.getSubscriberID());
      s.setString(2, subscriberEntry.getClientID());
      s.setString(3, subscriberEntry.getConsumerName());
      s.setString(4, subscriberEntry.getSelector());
      s.setString(5, sub);
      s.setString(6, destinationName);

      if (s.executeUpdate() != 1) {
        s.close();
        s = c.prepareStatement(this.statementProvider.getCreateDurableSubStatment());
        s.setInt(1, subscriberEntry.getSubscriberID());
        s.setString(2, subscriberEntry.getClientID());
        s.setString(3, subscriberEntry.getConsumerName());
        s.setString(4, subscriberEntry.getSelector());
        s.setString(5, sub);
        s.setString(6, destinationName);

        if (s.executeUpdate() != 1)
          log.error("Failed to store durable subscription for: " + sub);
      }
    }
    finally
    {
      try {
        s.close();
      }
      catch (Throwable e)
      {
      }
    }
  }

  public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub)
    throws SQLException
  {
    PreparedStatement s = null;
    ResultSet rs = null;
    try
    {
      s = c.prepareStatement(this.statementProvider.getFindDurableSubStatment());
      s.setString(1, sub);
      s.setString(2, destinationName);
      rs = s.executeQuery();

      if (!rs.next()) {
        //Object localObject1 = null;
        return null;
      }
      SubscriberEntry answer = new SubscriberEntry();
      answer.setSubscriberID(rs.getInt(1));
      answer.setClientID(rs.getString(2));
      answer.setConsumerName(rs.getString(3));
      answer.setDestination(rs.getString(4));

      SubscriberEntry e = answer;
      return e;
    }
    finally
    {
      try
      {
        rs.close();
      }
      catch (Throwable e) {
      }
      try {
        s.close(); } catch (Throwable e) {
      }
    }
   // throw localObject2;
  }
}